#include "config.h"

#include <sys/types.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <dirent.h>
#include <unistd.h>
#include <sys/mman.h>

#define DBG_SUBSYS S_LIBCONTROL

#include "limits.h"
#include "adt.h"
#include "sysy_lib.h"
#include "bmap.h"
#include "lich_md.h"
#include "net_table.h"
#include "configure.h"
#include "../replica/replica.h"
#include "disk.h"
#include "../storage/md_parent.h"
#include "disk_redis.h"
#include "pool_ctl.h"
#include "volume_ctl.h"
#include "stor_ctl.h"
#include "table_proto.h"
#include "pool_proto.h"
#include "md_proto.h"
#include "net_global.h"
#include "nodectl.h"
#include "castoff.h"
#include "job_dock.h"
#include "ylog.h"
#include "dbg.h"
#include "metadata.h"

typedef struct {
        chkid_t chkid;
        uint32_t disk;
        char pool[MAX_NAME_LEN];
} res_t;

typedef struct {
        const res_t *res;
        int count;
        int success;
        int fail;
        int stop;
        int stopped;
        int running;
} castoff_seg_t;

typedef enum {
        __SCAN__,
        __RUNNING__,
        __STOPPED__,
        __FAILED__,
        __DONE__,
} castoff_status_t;

typedef struct {
        int fd;
        castoff_status_t status;
        uint64_t castoff;
        uint64_t success;
        uint64_t fail;
        void *addr;
        uint32_t disk;
} arg_t;

#define THREAD_MAX 10

static int __deleting__;

static int __recovery_needcheck(const chkinfo_t *chkinfo)
{
        int ret, i, clean = 0;
        char buf[MAX_BUF_LEN];

        for (i = 0; i < chkinfo->repnum; i++) {
                if (chkinfo->diskid[i].status)
                        continue;

                ret = network_connect(&chkinfo->diskid[i].id, NULL, 3, 0);
                if (unlikely(ret)) {
                        continue;
                }

                clean++;
        }

        if (clean == 0) {
                CHKINFO_STR(chkinfo, buf);
                DERROR("lost %s\n", buf);
        }

        return chkinfo->repnum - clean;
}

static int __recovery_chunk(const nid_t *parentnid,
                            const fileid_t *parent, const chkinfo_t *chkinfo)
{
        int ret;

        if (__recovery_needcheck(chkinfo)) {
                ret = md_chunk_check(parentnid, parent, &chkinfo->id, -1, 0, 0, NULL);
                if (unlikely(ret)) {
                        YASSERT(ret != EINVAL);
                        GOTO(err_ret, ret);
                }
        }

        return 0;
err_ret:
        return ret;
}

static void __castoff_get_thread(int *thread)
{
        int ret;
        char key[MAX_PATH_LEN], value[MAX_BUF_LEN];

        snprintf(key, MAX_PATH_LEN, "castoff/thread");
        ret = nodectl_get(key, value, "1");
        YASSERT(ret == 0);

        *thread = atoi(value);
}

static void __castoff_get_stop(int *stop)
{
        int ret;
        char key[MAX_PATH_LEN], value[MAX_BUF_LEN];

        snprintf(key, MAX_PATH_LEN, "castoff/stop");
        ret = nodectl_get(key, value, "0");
        YASSERT(ret == 0);

        *stop = atoi(value);
}

static void __castoff_get_launch_stop(int *stop)
{
        int ret, pid;
        char key[MAX_PATH_LEN], value[MAX_BUF_LEN];
        char path[MAX_PATH_LEN];
        struct stat stbuf;

        snprintf(key, MAX_PATH_LEN, "castoff/launch_pid");

        ret = nodectl_get(key, value, "0");
        YASSERT(ret == 0);

        pid = atoi(value);

        if (pid == 0) {
                *stop = 0;
                return;
        }

        snprintf(path, MAX_PATH_LEN, "/proc/%d", pid);
        ret = stat(path, &stbuf);
        if (unlikely(ret)) {
                ret = errno;
                if (ret == ENOENT) {
                        *stop = 1;
                        return;
                } else {
                        YASSERT(0);
                }
        }

        *stop = 0;
}

static void __castoff_set_init()
{
        int ret;
        char key[MAX_PATH_LEN];

        snprintf(key, MAX_PATH_LEN, "castoff/stop");
        ret = nodectl_set(key, "0");
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        snprintf(key, MAX_PATH_LEN, "castoff/done");
        ret = nodectl_set(key, "0");
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        snprintf(key, MAX_PATH_LEN, "castoff/launch_pid");
        ret = nodectl_set(key, "0");
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);
}

static void __castoff_set_done()
{
        int ret;
        char key[MAX_PATH_LEN];

        snprintf(key, MAX_PATH_LEN, "castoff/done");
        ret = nodectl_set(key, "1");
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);
}

static void __castoff_dump(const arg_t *arg)
{
        int ret;
        char key[MAX_PATH_LEN], value[MAX_BUF_LEN];
        const char *status;

        if (arg->status == __SCAN__)
                status = "scan";
        else if (arg->status == __RUNNING__)
                status = "running";
        else if (arg->status == __STOPPED__)
                status = "stopped";
        else if (arg->status == __FAILED__)
                status = "failed";
        else if (arg->status == __DONE__)
                status = "done";
        else
                status = "unknow";

        snprintf(key, MAX_PATH_LEN, "castoff/info");
        snprintf(value, MAX_PATH_LEN,
                 "status:%s\n"
                 "castoff:%llu\n"
                 "success:%llu\n"
                 "fail:%llu\n",
                 status,
                 (LLU)arg->castoff,
                 (LLU)arg->success,
                 (LLU)arg->fail);

        ret = nodectl_set(key, value);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);
}

static int __castoff_scan__(const chkid_t *chkid, const char *pool, const diskloc_t *loc,
                                 const chkid_t *parent, const uint64_t meta_version, void *_arg)
{
        arg_t *arg = _arg;
        res_t res;
        int ret;

        res.chkid = *chkid;
        res.disk = arg->disk;
        strcpy(res.pool, pool);

        ret = write(arg->fd, &res, sizeof(res_t));
        (void) ret;
        arg->castoff++;

        return 0;
}

static void __castoff_scan(arg_t *arg, int idx)
{
        int fd, diskid;
        char path[MAX_PATH_LEN];

        sprintf(path, "/tmp/castoff-XXXXXX");
        fd = mkstemp(path);
        unlink(path);

        arg->fd = fd;
        arg->castoff = 0;
        arg->success = 0;
        arg->fail = 0;
        arg->disk = idx;

        // 扫描磁盘上所有的记录，包括cache disk的情况
        diskid = idx == DISK_ALL_IDX ? -1 : idx;

        disk_maping->iterator_bydisk(diskid, __castoff_scan__, arg,
                                     DM_FLAG_MD | DM_FLAG_RAW);

        arg->status = __SCAN__;
        __castoff_dump(arg);
}

static int __castoff_getlocalnid(nid_t *nid)
{
        int ret;
        char path[MAX_PATH_LEN], tmp[MAX_MSG_SIZE];

        snprintf(path, MAX_NAME_LEN, "%s/data/node/config/nid", gloconf.home);

        ret = _get_text(path, tmp, MAX_NAME_LEN);
        if (ret < 0) {
                ret = -ret;
                GOTO(err_ret, ret);
        }

        str2nid(nid, tmp);

        return 0;
err_ret:
        return ret;
}

static int __castoff_set_deleting(uint32_t idx, int deleting)
{
        int ret, retry = 0;

        ret = diskmd_set_deleting(idx, deleting);
        if (unlikely(ret))
                GOTO(err_ret, ret);

retry:
        if (idx == DISK_ALL_IDX) {
                ret = cluster_set_deleting(deleting);
                if (unlikely(ret)) {
                        USLEEP_RETRY(err_ret, ret, retry, retry, 50, (100 * 1000));
                }
        }

        __deleting__ = deleting;

        return 0;
err_ret:
        YASSERT(0);
        return ret;
}

static int __castoff_get_dist(const char *pool, const chkinfo_t *chkinfo, nid_t *_dist, int *_dist_count, const nid_t *from, int *_new)
{
        int ret, i, new;
        int dist_index, skip_count;
        nid_t skip[LICH_REPLICA_MAX*2];

        (void) from;

        skip_count = chkinfo->repnum;
        for (i = 0; i < chkinfo->repnum; i++) {
                skip[i] = chkinfo->diskid[i].id ;
        }

        dist_index = 0;
        new = 0;

        for (i = 0; i < chkinfo->repnum; i++) {
                if (nid_cmp(from,  &chkinfo->diskid[i].id) == 0) {
                        new++;

                        ret = dispatch_newdisk2(pool, &_dist[dist_index],
                                                skip, skip_count);
                        if (unlikely(ret)) {
                                if (ret == ENOSPC) {
                                        DINFO("replica may decrease, "CHKID_FORMAT"\n", CHKID_ARG(&chkinfo->id));
                                        continue;
                                }
                                GOTO(err_ret, ret);
                        }

                        skip[skip_count] = _dist[dist_index];
                        skip_count++;
                        dist_index++;
                } else {
                        _dist[dist_index] = chkinfo->diskid[i].id;
                        dist_index++;
                }
        }

        *_dist_count = dist_index;
        *_new = new;

        for (i = 0; i < *_dist_count; i++) {
                if (nid_cmp(from, &_dist[i]) == 0) {
                        YASSERT(0);
                }
        }

        if (*_dist_count< LICH_REPLICA_MIN) {
                DWARN("at least two replica, "CHKID_FORMAT"\n", CHKID_ARG(&chkinfo->id));
                ret = ENOSPC;
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

static int  __castoff_chunk(const char *pool, const chkid_t *chkid, const nid_t *from)
{
        int ret, dist_count, new;
        fileid_t _parent;
        fileid_t *parent = &_parent;
        nid_t dist[LICH_REPLICA_MAX+1];
        nid_t nid;
        fileid_t srv;

        chkinfo_t *chkinfo, *parent_chkinfo;
        char buf[CHKINFO_MAX];
        char buf2[CHKINFO_MAX];

        chkinfo = (void *)buf;
        parent_chkinfo = (void *)buf2;

        ret = md_parent_get(chkid, parent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        DINFO("get parent of "CHKID_FORMAT" is "CHKID_FORMAT "\n", CHKID_ARG(chkid), CHKID_ARG(parent));

        ret = md_chunk_getinfo(pool, parent, chkid, chkinfo, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (chkid_isroot(parent)) {
                memcpy(parent_chkinfo, chkinfo, CHKINFO_SIZE(chkinfo->repnum));
        } else {
                ret = md_chunk_getinfo(pool, NULL, parent, parent_chkinfo, NULL);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        ret = __recovery_chunk(&parent_chkinfo->diskid[0].id, parent, chkinfo);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __castoff_get_dist(pool, chkinfo, dist, &dist_count, from, &new);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (new == 0) {
                DINFO("need not cast\n");
                goto out;
        }

        if (chkid->type == __POOL_CHUNK__ || chkid->type == __VOLUME_CHUNK__) {
                nid = chkinfo->diskid[0].id;
                srv = chkinfo->id;
        } else {
                nid = parent_chkinfo->diskid[0].id;
                srv = *parent;
        }

        ret = md_chunk_move(&nid, &srv, chkid, (const nid_t *)dist, dist_count);
        if (unlikely(ret))
                GOTO(err_ret, ret);

out:
        return 0;
err_ret:
        return ret;
}

static int  __castoff_chunk_retry(const char *pool, const chkid_t *chkid, const nid_t *from)
{
        int ret, retry;

        retry = 0;
retry:
        ret = __castoff_chunk(pool, chkid, from);
        if (unlikely(ret)) {
                if (ret == ENOSPC) {
                        GOTO(err_ret, ret);
                }

                USLEEP_RETRY(err_ret, ret, retry, retry, 180, (100 * 1000));
        }

        return 0;
err_ret:
        return ret;
}

static int __replica_srv_cast_retry(const chkid_t *chkid, uint32_t from)
{
        int ret, retry;

        retry = 0;
retry:
        ret = replica_srv_cast(chkid, from);
        if (unlikely(ret)) {
                if (ret == EAGAIN || ret == EBUSY) {
                        USLEEP_RETRY(err_ret, ret, retry, retry, 10, (100 * 1000));
                }
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

static int __castoff_thread_cast(const res_t *res)
{
        int ret, disk;
        chkid_t chkid;
        nid_t nid;

        chkid = res->chkid;
        disk = res->disk;

        nid = *net_getnid();
        if (net_isnull(&nid)) {
                ret = __castoff_getlocalnid(&nid);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                ret = network_connect1(&nid);
                if (unlikely(ret)) {
                        ret = ENOSPC;
                        GOTO(err_ret, ret);
                }
        }

        ret = __replica_srv_cast_retry(&chkid, disk);
        if (unlikely(ret)) {
                if (ret == ENOSPC) {
                        DWARN("try cast to other node, chunk "CHKID_FORMAT"\n", CHKID_ARG(&chkid));
                        ret = __castoff_chunk_retry(res->pool, &chkid, &nid);
                        if (unlikely(ret)) {
                                if (ret == ENOENT) {
                                        DWARN("chunk "CHKID_FORMAT" was gone\n", CHKID_ARG(&chkid));
                                } else
                                        GOTO(err_ret, ret);
                        }
                } else if (ret == ENOENT) {
                        DWARN("chunk "CHKID_FORMAT" was gone\n", CHKID_ARG(&chkid));
                } else {
                        GOTO(err_ret, ret);
                }
        }

        return 0;
err_ret:
        DERROR("chunk "CHKID_FORMAT" cast, form "NID_FORMAT", disk %d, ret: %d\n", CHKID_ARG(&chkid), NID_ARG(&nid), disk, ret);
        return ret;
}

static void *__castoff_thread_worker(void *arg)
{
        int ret, i;
        castoff_seg_t *seg = arg;

        DINFO("castoff start count %u\n", seg->count);

        for (i = 0; i < seg->count; i++) {
                if (seg->stop) {
                        DINFO("castoff force exit\n");
                        break;
                }

                ret = __castoff_thread_cast(&seg->res[i]);
                if (ret == 0) {
                        seg->success++;
                        DBUG("success %u\n", seg->success);
                } else {
                        seg->fail++;
                        DBUG("fail %u\n", seg->fail);
                }
        }

        seg->stopped = 1;

        DINFO("castoff exit\n");
        return NULL;
}

static int __castoff_thread_create(castoff_seg_t *seg)
{
        int ret;
        pthread_t th;
        pthread_attr_t ta;

        (void) pthread_attr_init(&ta);
        (void) pthread_attr_setdetachstate(&ta, PTHREAD_CREATE_DETACHED);

        ret = pthread_create(&th, &ta, __castoff_thread_worker, seg);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

static void __castoff_worker_startup(arg_t *arg, castoff_seg_t *_segs, int _thread, int *count)
{
        int ret, thread, step, left, i;
        castoff_seg_t *seg;

        thread = THREAD_MAX < _thread ? THREAD_MAX : _thread;
        thread = arg->castoff < 100 ? 1 : thread;
        step = (arg->castoff + THREAD_MAX) / thread;
        left = arg->castoff;

        DINFO("step %u left %u thread %u\n", step, left , thread);

        for (i = 0; i < thread; i++) {
                seg = &_segs[i];
                seg->res = arg->addr + (step * i * sizeof(res_t));
                seg->count = left < step ? left : step;
                seg->stop = 0;
                seg->fail = 0;
                seg->running = 1;
                seg->success = 0;
                seg->stopped = 0;
                left -= step;

                ret = __castoff_thread_create(seg);
                YASSERT(ret == 0);
        }

        *count = thread;
        arg->status = __RUNNING__;
}

static  void __castoff_stop(arg_t *arg, castoff_seg_t *seg, int thread)
{
        int i, count, retry = 0;

        for (i = 0; i < thread; i++) {
                seg[i].stop = 1;
        }

        while (1) {
                count = 0;
                for (i = 0; i < thread; i++) {
                        if (seg[i].stopped)
                                count++;
                }

                if (count == thread) {
                        break;
                } else {
                        YASSERT(retry < 100);
                        DWARN("stop %u --> %u, retry %u\n", count, thread, retry);
                        retry++;
                        sleep(1);
                }
        }

        if (arg->fail)
                arg->status = __FAILED__;
        else
                arg->status = __STOPPED__;
        __castoff_dump(arg);
}

static int __castoff_done(arg_t *arg)
{
        int ret, online;
        uint32_t i;

        YASSERT(arg->success == arg->castoff);

        for (i = 0; i < DISK_MAX; i++) {
                if (i == arg->disk || arg->disk == DISK_ALL_IDX) {
                        ret = diskmd_online(i, &online);
                        if (ret)
                                GOTO(err_ret, ret);

                        if (!online)
                                continue;

                        ret = diskmd_destroy(i);
                        if (unlikely(ret)) {
                                if (ret == ENODEV) {
                                        continue;
                                }

                                GOTO(err_ret, ret);
                        }

                        diskmd_remove(i);
                }
        }

        if (arg->disk == DISK_ALL_IDX) {
        }

        __castoff_set_done();
        arg->status = __DONE__;
        __castoff_dump(arg);

        if (arg->disk == DISK_ALL_IDX) {
                DINFO("del node finished, so exit\n");
                exit(1);
        }

        return 0;
err_ret:
        YASSERT(0);
        return ret;
}

static int __castoff_worker_check(arg_t *arg, castoff_seg_t *_segs, int thread)
{
        int stop, running, i;
        castoff_seg_t *seg;

        arg->success = 0;
        arg->fail = 0;
        running = 0;
        for (i = 0; i < thread; i++) {
                seg = &_segs[i];
                arg->success += seg->success;
                arg->fail += seg->fail;
                running += seg->running;
        }

        DINFO("castoff %llu success %llu fail %llu, running %u\n",
              (LLU)arg->castoff, (LLU)arg->success, (LLU)arg->fail, running);

        __castoff_dump(arg);

        YASSERT(arg->success + arg->fail <= arg->castoff);
        if (arg->success == arg->castoff) {
                __castoff_done(arg);
                return 1;
        } else if (arg->fail) {
                DINFO("castoff fail...\n");
                __castoff_stop(arg, _segs, thread);
                return 1;
        }

        YASSERT(running);

        __castoff_get_stop(&stop);
        if (stop) {
                DINFO("castoff stop...\n");
                __castoff_stop(arg, _segs, thread);
                return 1;
        }

        __castoff_get_launch_stop(&stop);
        if (stop) {
                DINFO("castoff stop with launch ...\n");
                __castoff_stop(arg, _segs, thread);
                return 1;
        }

        return 0;
}

static void *__castoff_worker(void *_arg)
{
        int ret, thread, _thread, _thread_new;
        uint32_t idx;
        arg_t arg;
        castoff_seg_t _segs[THREAD_MAX];

        idx = (uint32_t)(uintptr_t)_arg;

        DINFO("delete disk idx: %d\n", idx);

        __castoff_scan(&arg, idx);

        if (arg.castoff == 0) {
                DINFO("node clean\n");
                __castoff_done(&arg);
                goto out;
        }

        // 恢复要删除磁盘其上的数据
        arg.addr = mmap(NULL, arg.castoff * sizeof(res_t), PROT_READ, MAP_PRIVATE, arg.fd, 0);
        if (arg.addr == MAP_FAILED) {
                ret = errno;
                UNIMPLEMENTED(__DUMP__);
        }

retry:
        __castoff_get_thread(&_thread);

        // 恢复线程
        __castoff_worker_startup(&arg, _segs, _thread, &thread);

        // 主线程
        while (1) {
                sleep(1);

                ret = __castoff_worker_check(&arg, _segs, thread);
                if (unlikely(ret))
                        break;

                __castoff_get_thread(&_thread_new);
                if (_thread != _thread_new) {
                        DINFO("castoff interrupted by new thread argument\n");
                        __castoff_stop(&arg, _segs, thread);
                        goto retry;
                }
        }

        munmap(arg.addr, arg.castoff * sizeof(res_t));
        close(arg.fd);

out:
        __castoff_set_deleting(idx, 0);
        return NULL;
}

/**
 * 通过 lich.node --disk_del <device>运行本过程。
 * 与拔盘相比，触发机制和处理过程都有所不同。
 *
 * 基本流程：
 * 1. 扫描该盘上的所有chunk记录，存入一临时文件；
 * 2. 启动一组线程，分区进行恢复
 *
 * @pre writeback_flush stopped
 *
 * @note 删除磁盘的过程，区分普通磁盘和cache盘。在数据库记录里，分别对应disk和wbdisk字段。
 *
 * @note 如处理过程的任意阶段，发生故障，会造成什么后果？本过程是否可重入？
 *
 * @see 数据恢复，HSM
 *
 * @param diskid
 * @return
 */
int castoff_disk(int diskid)
{
        int ret;
        pthread_t th;
        pthread_attr_t ta;

        // 保证节点内同时只有一个任务在运行
        if (__deleting__) {
                DERROR("was running\n");
                ret = EPERM;
                GOTO(err_ret, ret);
        }

        DINFO("castoff: %d\n", diskid);

        DINFO("castoff: %d\n", diskid);
        // 设置删除标记，不再往上面写数据
        ret = __castoff_set_deleting(diskid, 1);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        __castoff_set_init();

        /*等待 deleting 生效*/
        sleep(7);

        (void) pthread_attr_init(&ta);
        (void) pthread_attr_setdetachstate(&ta, PTHREAD_CREATE_DETACHED);

        ret = pthread_create(&th, &ta, __castoff_worker, (void *)(uintptr_t)diskid);
        if (unlikely(ret))
                GOTO(err_deleting, ret);

        return 0;
err_deleting:
        __castoff_set_deleting(diskid, 0);
err_ret:
        return ret;
}
